package org.itew.mymapreduce.core;

import java.util.Set;

import org.itew.mymapreduce.io.Writable;

/**
 * 该类是一个reduce任务的抽象，它负责将结果收集
 * @author JiLu
 * 
 */
public class BasicReducerRunner<IK,IV,OK,OV> implements Runnable{
	
	private final DataCollection<IK,IV> mergedDataCollection;
	private final Reducer<IK,IV,OK,OV> reducer;
	private final Set<IK> keySet;
	private final Writable<OK,OV> writer;
	
	public BasicReducerRunner(DataCollection<IK,IV> mergedDataCollection,Reducer<IK,IV,OK,OV> reducer,Set<IK> keySet,Writable<OK,OV> writer){
		this.mergedDataCollection = mergedDataCollection;
		this.reducer = reducer;
		this.keySet = keySet;
		this.writer = writer;
	}

	@Override
	public void run() {
		
		for(IK key : keySet){
			reducer.reduce(key, mergedDataCollection.getValues(key), writer);
		}
		writer.close();
		
	}
	
}
